Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][scalability] Change ray syncer from unary call to streaming call #30460

Merged
merged 46 commits into from
Jan 26, 2023

Conversation

fishbone
Copy link
Contributor

@fishbone fishbone commented Nov 18, 2022

Why are these changes needed?

To handle the failure of resource broadcasting, it's hard to do fault tolerance since the status need to be maintained.

This PR updated the communication protocol to streaming.

There are several things changed for the protocol:

  • Once we received the message, it'll be pushed immediately. But it'll be buffered (512kb), so the cost is not big.
  • If there is no more message or it exceeded the buffer, it'll flush.

The PR has been tested with 2k nodes (2 cpus per node) and 14k actors.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
@fishbone fishbone changed the title Syncer stream [core] Change ray syncer from unary call to streaming call Nov 19, 2022
@fishbone fishbone marked this pull request as ready for review November 19, 2022 04:31
@fishbone fishbone changed the title [core] Change ray syncer from unary call to streaming call [core][scalability] Change ray syncer from unary call to streaming call Nov 23, 2022
@rkooo567
Copy link
Contributor

Is this ready to review? Btw, before merging more advanced features like this, should we turn on syncer by default first? Or is this necessary for that step?

@fishbone
Copy link
Contributor Author

fishbone commented Nov 26, 2022

@rkooo567 no we shouldn't turn on syncer by default. At the beginning, step by step is my plan. But I realized that even just with the unary syncer, it's already a big feature and involve a lot of testing. Besides, handling failure with the unary rpc is very hard since a lot of states need to be kept. So I'd prefer longer time testing and roll out slowly. Besides the streaming way is simpler than the unary way and thus should be easier to maintain. I don't think we should waste time testing and staging this feature twice.

Thanks for the abstraction we made, most of the changes is in communication layer so you can see only the sending/receiving and connecting/disconnecting logic got changed.

And besides, there is no intention to keep the protocol the same as before (still very similar) since we changed the rpc from unary to streaming. We'll fix the issues when we doing testing.

Btw, I think callback API is just hard to get the things done without a lot of knowledge (threading things). I think after this, I'll try to figure out a good gRPC framework which we can use and use that in ray syncer.

Copy link
Contributor

@scv119 scv119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clean it up! You might want @rkooo567 to take another look as well.

/// disconnect from the remote node.
/// For the implementation, in the constructor, it needs to connect to the remote
/// node and it needs to implement the communication between the two nodes.
class RaySyncerBidiReactor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks much cleaner.

Since RaySyncerBidiReactor is not doing too much, would merge RaySyncerBidiReactor into RaySyncerBidiReactorBase make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because server reactor and client reactor are two types and we maintain it in the same way in RaySyncer (it doesn't care about server/client), all are store in a map.

You can think this class more like an interface class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying it is to keep RaySyncerBidiReactorBase<Server> and RaySyncerBidiReactorBase<Client> to the same map (as the same type)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes because for the class RaySyncer, it's the same logic.

src/ray/common/ray_syncer/ray_syncer-inl.h Show resolved Hide resolved
src/ray/common/ray_syncer/ray_syncer-inl.h Outdated Show resolved Hide resolved
Signed-off-by: Yi Cheng <[email protected]>
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make reactor thread-safe? I saw this is used within io service and outside (also threading model is getting more confusing. Should I assume everything runs in the same thread or separate?)


/// Return the remote node id of this connection.
const std::string &GetRemoteNodeID() const { return remote_node_id_; }

virtual void Disconnect() = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring to explain what this method is supposed to do?

/// disconnect from the remote node.
/// For the implementation, in the constructor, it needs to connect to the remote
/// node and it needs to implement the communication between the two nodes.
class RaySyncerBidiReactor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying it is to keep RaySyncerBidiReactorBase<Server> and RaySyncerBidiReactorBase<Client> to the same map (as the same type)?


bool PushToSendingQueue(std::shared_ptr<const RaySyncMessage> message) override {
// Try to filter out the messages the target node already has.
// Usually it'll be the case when the message is generated from the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment with a real world example? That might be easier to understand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not to take care of real world failure case. This is just to avoid the caller send the message to the same node. (optimization)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant it will be easier to understand the comment if you have example (e.g., resource update received from GCS doesn't need to be resent)

src/ray/common/ray_syncer/ray_syncer-inl.h Show resolved Hide resolved
io_context_.dispatch([this]() { SendNext(); }, "");
} else {
// No need to resent the message since if ok=false, it's the end
// of gRPC call and we'll reconnect in case of a failure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also explain what will reconnect instead of "we"? Like the caller? The client side syncer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also explain briefly what callback will be called from grpc (looks like it is OnDone?)

io_context_,
[this](auto msg) { BroadcastRaySyncMessage(msg); },
[this, channel](const std::string &node_id, bool restart) {
sync_reactors_.erase(node_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to call Disconnect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. this is cleanup callback so it mean the disconnect has been called.

if (restart) {
RAY_LOG(INFO) << "Connection is broken. Reconnect to node: "
<< NodeID::FromBinary(node_id);
Connect(node_id, channel);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the failure is because the channel is closed from the server, is it going to recover it? No need to create a new channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channel will reconnect by itself and if it failed for a long time, it'll crash by ray design. no need to recreate channel.

for (const auto &[_, messages] : node_state_->GetClusterView()) {
for (auto &message : messages) {
for (const auto &message : messages) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the name of API GetClusterView to GetPendingMessages or something? iterating messages from the cluster view sounds confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually is fetching the ClusterView.
It's a new node and we need to send the snapshot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm terminology is a bit confusing to me, but it is probably I am not very familiar with syncer (we can iterate messages returned from ClusterView sounds a bit weird to me)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClusterView is a map from node -> status of the node.

}

void RaySyncer::Disconnect(const std::string &node_id) {
std::promise<RaySyncerBidiReactor *> promise;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this promise block pattern seems to be used frequently. Is there any way to make it a general method? Something like postBlocking() or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like io_conext.sync_run() ? Don't want to add it here. I'll try to include that in the threading work.

},
"RaySyncerDisconnect");
auto reactor = promise.get_future().get();
if (reactor != nullptr) {
reactor->Disconnect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it need to be protected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary to protect this.

But I'll make sure all public methods run in io context.

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 20, 2023
Signed-off-by: Yi Cheng <[email protected]>
@fishbone
Copy link
Contributor Author

I saw this is used within io service and outside

@rkooo567 I agree with you on this. The root cause is the gRPC thread and io thread both will call some methods there.

I think we need a wrapper on gRPC to fix it.

Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
@fishbone fishbone removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 20, 2023
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Signed-off-by: Yi Cheng <[email protected]>
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. It'll be great if we have fault tolerance semantics documented somewhere.

Also, the main concern is the threading model is confusing and hard to understand (though it is same across ray 😞). I guess wrapping gRPC will make things more clear, then we should do it asap!

for (const auto &[_, messages] : node_state_->GetClusterView()) {
for (auto &message : messages) {
for (const auto &message : messages) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm terminology is a bit confusing to me, but it is probably I am not very familiar with syncer (we can iterate messages returned from ClusterView sounds a bit weird to me)


bool PushToSendingQueue(std::shared_ptr<const RaySyncMessage> message) override {
// Try to filter out the messages the target node already has.
// Usually it'll be the case when the message is generated from the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant it will be easier to understand the comment if you have example (e.g., resource update received from GCS doesn't need to be resent)

io_context_.dispatch([this]() { SendNext(); }, "");
} else {
RAY_LOG_EVERY_N(ERROR, 100)
<< "Failed to send the message to: " << NodeID::FromBinary(GetRemoteNodeID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resent because we will recreate the connection and it will be sent from there?

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 24, 2023
@fishbone
Copy link
Contributor Author

LGTM. It'll be great if we have fault tolerance semantics documented somewhere.

Also, the main concern is the threading model is confusing and hard to understand (though it is same across ray 😞). I guess wrapping gRPC will make things more clear, then we should do it asap!

Threading is a big thing and will be the next step. This might not be the final solution we will choose. We'll see. But this will be the next step after light weight resource broadcasting enabled.

@fishbone
Copy link
Contributor Author

resent because we will recreate the connection and it will be sent from there?

Yes!

@fishbone
Copy link
Contributor Author

fishbone commented Jan 24, 2023

merge master to pr.

@rkooo567
Copy link
Contributor

rkooo567 commented Jan 24, 2023

Before merging, should we do the following?

  1. Turn on the flag (and turn off again before merging) and see CI
  2. Run relevant nightly tests with the flag on

Signed-off-by: Yi Cheng <[email protected]>
@fishbone
Copy link
Contributor Author

fishbone commented Jan 24, 2023

@rkooo567 I've tested them (CI https://buildkite.com/ray-project/oss-ci-build-pr/builds/10223#_). gcs ft related test failed, not sure the root cause.
And also run many actor tests mannually and it look good with 250 nodes (10% improvement in through put)

Overall it's ok.

Given this is a big PR changing the communication layers, I'm plan to check the ci test failure quickly and if hard, fix them later.
(It's going to be cleaner and easier for review)

@fishbone
Copy link
Contributor Author

if we have fault tolerance semantics documented somewhere.

@rkooo567 I actually put some workflow here

It at least give the developer an overview of how the state changed.

Signed-off-by: Yi Cheng <[email protected]>
@fishbone
Copy link
Contributor Author

gcs ft test fixed.

@fishbone
Copy link
Contributor Author

all test passed when flag is on https://buildkite.com/ray-project/oss-ci-build-pr/builds/10260
ray syncer test failed in asan. i'm going to take another look, but i feel it's close.

Signed-off-by: Yi Cheng <[email protected]>
@fishbone fishbone merged commit 1f3226e into ray-project:master Jan 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Light weight resource broadcasting testing
3 participants